#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# pytype: skip-file

import logging
import re
import time
from typing import Optional
from typing import Tuple

from apache_beam import version as beam_version
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners.interactive import interactive_environment as ie
from apache_beam.runners.interactive.dataproc.types import ClusterMetadata
from apache_beam.runners.interactive.utils import obfuscate
from apache_beam.runners.interactive.utils import progress_indicated

try:
  from google.cloud import dataproc_v1

  from apache_beam.io.gcp import gcsfilesystem  # pylint: disable=ungrouped-imports
except ImportError:

  class UnimportedDataproc:
    Cluster = None

  dataproc_v1 = UnimportedDataproc()

_LOGGER = logging.getLogger(__name__)

# Name of the log file auto-generated by Dataproc. We use it to locate the
# startup output of the Flink daemon to retrieve master url and dashboard
# information.
DATAPROC_STAGING_LOG_NAME = 'dataproc-initialization-script-0_output'

# Home dir of os user yarn.
YARN_HOME = '/var/lib/hadoop-yarn'

# Configures the os user yarn to use gcloud as the docker credHelper.
# Also sets some taskmanager configurations for better parallelism.
# Finally starts the yarn application: flink cluster in session mode.
INIT_ACTION = """#!/bin/bash
sudo -u yarn gcloud auth configure-docker --quiet

readonly FLINK_INSTALL_DIR='/usr/lib/flink'
readonly MASTER_HOSTNAME="$(/usr/share/google/get_metadata_value attributes/dataproc-master)"

cat <<EOF >>${FLINK_INSTALL_DIR}/conf/flink-conf.yaml
taskmanager.memory.network.fraction: 0.2
taskmanager.memory.network.min: 64mb
taskmanager.memory.network.max: 1gb
EOF
sed -i \
    "s/^taskmanager.network.numberOfBuffers: 2048/taskmanager.network.numberOfBuffers: 8192/" \
    ${FLINK_INSTALL_DIR}/conf/flink-conf.yaml

if [[ "${HOSTNAME}" == "${MASTER_HOSTNAME}" ]]; then
  . /usr/bin/flink-yarn-daemon
fi
"""


class DataprocClusterManager:
  """Self-contained cluster manager that controls the lifecyle of a Dataproc
  cluster connected by one or more pipelines under Interactive Beam.
  """
  def __init__(self, cluster_metadata: ClusterMetadata) -> None:
    """Initializes the DataprocClusterManager with properties required
    to interface with the Dataproc ClusterControllerClient.
    """
    self.cluster_metadata = cluster_metadata
    # Pipelines whose jobs are executed on the cluster.
    self.pipelines = set()
    self._cluster_client = dataproc_v1.ClusterControllerClient(
        client_options={
            'api_endpoint': \
            f'{self.cluster_metadata.region}-dataproc.googleapis.com:443'
        })
    self._fs = gcsfilesystem.GCSFileSystem(PipelineOptions())
    self._staging_directory = None
    cache_dir = ie.current_env().options.cache_root
    if not cache_dir.startswith('gs://'):
      error_msg = (
          'ib.options.cache_root needs to be a Cloud Storage '
          'Bucket to cache source recording and PCollections in current '
          f'interactive setup, instead \'{cache_dir}\' is assigned.')
      _LOGGER.error(error_msg)
      raise ValueError(error_msg)
    self._cache_root = cache_dir.rstrip('/')

  def stage_init_action(self) -> str:
    """Stages the initialization action script to GCS cache root to set up
    Dataproc clusters.

    Returns the staged gcs file path.
    """
    # Versionizes the initialization action script.
    init_action_ver = obfuscate(INIT_ACTION)
    path = f'{self._cache_root}/dataproc-init-action-{init_action_ver}.sh'
    if not self._fs.exists(path):
      with self._fs.create(path) as bwriter:
        bwriter.write(INIT_ACTION.encode())
    return path

  @progress_indicated
  def create_cluster(self, cluster: dict) -> None:
    """Attempts to create a cluster using attributes that were
    initialized with the DataprocClusterManager instance.

    Args:
      cluster: Dictionary representing Dataproc cluster. Read more about the
          schema for clusters here:
          https://cloud.google.com/python/docs/reference/dataproc/latest/google.cloud.dataproc_v1.types.Cluster
    """
    if self.cluster_metadata.master_url:
      return
    try:
      self._cluster_client.create_cluster(
          request={
              'project_id': self.cluster_metadata.project_id,
              'region': self.cluster_metadata.region,
              'cluster': cluster
          })
    except Exception as e:
      if e.code == 409:
        _LOGGER.info(
            'Cluster %s already exists. Continuing...',
            self.cluster_metadata.cluster_name)
      elif e.code == 403:
        _LOGGER.error(
            'Due to insufficient project permissions, '
            'unable to create cluster: %s',
            self.cluster_metadata.cluster_name)
        raise ValueError(
            'You cannot create a cluster in project: {}'.format(
                self.cluster_metadata.project_id))
      elif e.code == 501:
        _LOGGER.error(
            'Invalid region provided: %s', self.cluster_metadata.region)
        raise ValueError(
            'Region {} does not exist!'.format(self.cluster_metadata.region))
      else:
        _LOGGER.error(
            'Unable to create cluster: %s', self.cluster_metadata.cluster_name)
        raise e
    else:
      _LOGGER.info(
          'Cluster created successfully: %s',
          self.cluster_metadata.cluster_name)
      self._staging_directory = self.get_staging_location()
      master_url, dashboard = self.get_master_url_and_dashboard()
      self.cluster_metadata.master_url = master_url
      self.cluster_metadata.dashboard = dashboard

  def create_flink_cluster(self) -> None:
    """Calls _create_cluster with a configuration that enables FlinkRunner."""
    init_action_path = self.stage_init_action()
    # https://cloud.google.com/php/docs/reference/cloud-dataproc/latest/V1.Cluster
    cluster = {
        'project_id': self.cluster_metadata.project_id,
        'cluster_name': self.cluster_metadata.cluster_name,
        'config': {
            'software_config': {
                # TODO(https://github.com/apache/beam/issues/21527): Uncomment
                # these lines when a Dataproc image is released with previously
                # missing dependencies.
                # 'image_version': ie.current_env().clusters.
                # DATAPROC_IMAGE_VERSION,
                'optional_components': ['DOCKER', 'FLINK'],
                'properties': {
                    # Enforces HOME dir for user yarn.
                    'yarn:yarn.nodemanager.user-home-dir': YARN_HOME
                }
            },
            'initialization_actions': [{
                'executable_file': init_action_path
            }],
            'gce_cluster_config': {
                'metadata': {
                    'flink-start-yarn-session': 'false'
                },
                'service_account_scopes': [
                    'https://www.googleapis.com/auth/cloud-platform'
                ],
                'internal_ip_only': False
            },
            'master_config': {
                # There must be 1 and only 1 instance of master.
                'num_instances': 1
            },
            'worker_config': {},
            'endpoint_config': {
                'enable_http_port_access': True
            }
        },
        'labels': {
            'goog-dataflow-notebook': beam_version.__version__.replace(
                '.', '_')
        }
    }

    # Additional gce_cluster_config.
    gce_cluster_config = cluster['config']['gce_cluster_config']
    if self.cluster_metadata.subnetwork:
      gce_cluster_config['subnetwork_uri'] = self.cluster_metadata.subnetwork

    # Additional InstanceGroupConfig for master and workers.
    master_config = cluster['config']['master_config']
    worker_config = cluster['config']['worker_config']
    if self.cluster_metadata.num_workers:
      worker_config['num_instances'] = self.cluster_metadata.num_workers
    if self.cluster_metadata.machine_type:
      master_config['machine_type_uri'] = self.cluster_metadata.machine_type
      worker_config['machine_type_uri'] = self.cluster_metadata.machine_type

    self.create_cluster(cluster)

  def cleanup(self) -> None:
    """Deletes the cluster that uses the attributes initialized
    with the DataprocClusterManager instance."""
    try:
      self._cluster_client.delete_cluster(
          request={
              'project_id': self.cluster_metadata.project_id,
              'region': self.cluster_metadata.region,
              'cluster_name': self.cluster_metadata.cluster_name,
          })
      self.cleanup_staging_files()
    except Exception as e:
      if e.code == 403:
        _LOGGER.error(
            'Due to insufficient project permissions, '
            'unable to clean up the default cluster: %s',
            self.cluster_metadata.cluster_name)
        raise ValueError(
            'You cannot delete a cluster in project: {}'.format(
                self.cluster_metadata.project_id))
      elif e.code == 404:
        _LOGGER.error(
            'Cluster does not exist: %s', self.cluster_metadata.cluster_name)
        raise ValueError(
            'Cluster was not found: {}'.format(
                self.cluster_metadata.cluster_name))
      else:
        _LOGGER.error(
            'Failed to delete cluster: %s', self.cluster_metadata.cluster_name)
        raise e

  def get_cluster_details(self) -> dataproc_v1.Cluster:
    """Gets the Dataproc_v1 Cluster object for the current cluster manager."""
    try:
      return self._cluster_client.get_cluster(
          request={
              'project_id': self.cluster_metadata.project_id,
              'region': self.cluster_metadata.region,
              'cluster_name': self.cluster_metadata.cluster_name
          })
    except Exception as e:
      if e.code == 403:
        _LOGGER.error(
            'Due to insufficient project permissions, '
            'unable to retrieve information for cluster: %s',
            self.cluster_metadata.cluster_name)
        raise ValueError(
            'You cannot view clusters in project: {}'.format(
                self.cluster_metadata.project_id))
      elif e.code == 404:
        _LOGGER.error(
            'Cluster does not exist: %s', self.cluster_metadata.cluster_name)
        raise ValueError(
            'Cluster was not found: {}'.format(
                self.cluster_metadata.cluster_name))
      else:
        _LOGGER.error(
            'Failed to get information for cluster: %s',
            self.cluster_metadata.cluster_name)
        raise e

  def wait_for_cluster_to_provision(self) -> None:
    while self.get_cluster_details().status.state.name == 'CREATING':
      time.sleep(15)

  def get_staging_location(self) -> str:
    """Gets the staging bucket of an existing Dataproc cluster."""
    try:
      self.wait_for_cluster_to_provision()
      cluster_details = self.get_cluster_details()
      bucket_name = cluster_details.config.config_bucket
      gcs_path = 'gs://' + bucket_name + '/google-cloud-dataproc-metainfo/'
      for file in self._fs._list(gcs_path):
        if self.cluster_metadata.cluster_name in file.path:
          # this file path split will look something like:
          # ['gs://.../google-cloud-dataproc-metainfo/{staging_dir}/',
          # '-{node-type}/dataproc-initialization-script-0_output']
          return file.path.split(self.cluster_metadata.cluster_name)[0]
    except Exception as e:
      _LOGGER.error(
          'Failed to get %s cluster staging bucket.',
          self.cluster_metadata.cluster_name)
      raise e

  def parse_master_url_and_dashboard(self, line: str) -> Tuple[str, str]:
    """Parses the master_url and YARN application_id of the Flink process from
    an input line. The line containing both the master_url and application id
    is always formatted as such:
    {text} Found Web Interface {master_url} of application
    '{application_id}'.\\n

    Truncated example where '...' represents additional text between segments:
    ... google-dataproc-startup[000]: ... activate-component-flink[0000]:
    ...org.apache.flink.yarn.YarnClusterDescriptor... [] -
    Found Web Interface example-master-url:50000 of application
    'application_123456789000_0001'.

    Returns the flink_master_url and dashboard link as a tuple."""
    cluster_details = self.get_cluster_details()
    yarn_endpoint = cluster_details.config.endpoint_config.http_ports[
        'YARN ResourceManager']
    segment = line.split('Found Web Interface ')[1].split(' of application ')
    master_url = segment[0]
    application_id = re.sub('\'|.\n', '', segment[1])
    dashboard = re.sub(
        '/yarn/',
        '/gateway/default/yarn/proxy/' + application_id + '/',
        yarn_endpoint)
    return master_url, dashboard

  def get_master_url_and_dashboard(self) -> Tuple[Optional[str], Optional[str]]:
    """Returns the master_url of the current cluster."""
    startup_logs = []
    for file in self._fs._list(self._staging_directory):
      if DATAPROC_STAGING_LOG_NAME in file.path:
        startup_logs.append(file.path)

    for log in startup_logs:
      content = self._fs.open(log)
      for line in content.readlines():
        decoded_line = line.decode()
        if 'Found Web Interface' in decoded_line:
          return self.parse_master_url_and_dashboard(decoded_line)
    return None, None

  def cleanup_staging_files(self) -> None:
    if self._staging_directory:
      staging_files = [
          file.path for file in self._fs._list(self._staging_directory)
      ]
      self._fs.delete(staging_files)
    if self._cache_root:
      cache_files = [file.path for file in self._fs._list(self._cache_root)]
      self._fs.delete(cache_files)
